{
  "metadata" : {
    "id" : "033cd6c0-1dcb-4a13-9367-c83c4629f49d",
    "name" : "streaming-listener-example.snb.ipynb",
    "user_save_timestamp" : "2019-05-13T00:30:13.957Z",
    "auto_save_timestamp" : "1970-01-01T01:00:00.000Z",
    "language_info" : {
      "name" : "scala",
      "file_extension" : "scala",
      "codemirror_mode" : "text/x-scala"
    },
    "trusted" : true,
    "sparkNotebook" : null,
    "customLocalRepo" : null,
    "customRepos" : null,
    "customDeps" : [ "org.apache.spark %% spark-streaming-kafka-0-8 % 2.1.0" ],
    "customImports" : null,
    "customArgs" : null,
    "customSparkConf" : {
      "jars" : ""
    },
    "customVars" : null
  },
  "cells" : [ {
    "metadata" : {
      "id" : "40AAFF23B9654D2A9B2E0C59515BAF8A"
    },
    "cell_type" : "markdown",
    "source" : "# Streaming Data with Kafka - With added Custom Listener"
  }, {
    "metadata" : {
      "id" : "D5BA242BD89A48F3816A0D7F53206849"
    },
    "cell_type" : "markdown",
    "source" : "In this notebook extends our Kafka job with an custom Listener that will let us visualize the lifecycle of the streaming job.\n\nAs we have explored the application logic of this job already, we can head to towards the bottom of the notebook where we implement a custom StreamingListener and register it to the StreamingContext.\n\nThen, when we start our streaming job, we can observe the different streaming events reported in a reactive UI Table widget in the notebook."
  }, {
    "metadata" : {
      "id" : "79CD566650664F04A8515F55A809A44D"
    },
    "cell_type" : "markdown",
    "source" : "\n## Our Streaming dataset will consist of sensor information, containing the sensorId, a timestamp, and a value.\nFor the sake of simplicity in this self-contained example, we are going to generate a randomized dataset, using an scenario that simulates a real IoT use case.\nThe timestamp will be the time of execution and each record will be formatted as a string coming from \"the field\" of comma separated values.\n\nWe also add a bit of real-world chaos to the data: Due to weather conditions, some sensors publish corrupt data. "
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "1BA7507DA513491F8D3A1AED38F087CB"
    },
    "cell_type" : "code",
    "source" : [ "val topic = \"iot-data\"\n", "val workDir = \"/tmp/learningsparkstreaming/\"\n", "val referenceFile = \"sensor-records.parquet\"\n", "val targetFile = \"enrichedIoTStream.parquet\"\n", "val unknownSensorsTargetFile = \"unknownSensorsStream.parquet\"\n", "val kafkaBootstrapServer = \"127.0.0.1:9092\"" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "62A5D12E89B24EFB8C411495942032B1"
    },
    "cell_type" : "markdown",
    "source" : "# Load the reference data from a parquet file\nWe also cache the data to keep it in memory and improve the performance of our steaming application"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "78E6ABA2D88B47F38905441B7D79E65D"
    },
    "cell_type" : "code",
    "source" : [ "val sensorRef = sparkSession.read.parquet(s\"$workDir/$referenceFile\")\n", "sensorRef.cache()" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "9CBAE4C0C1EB48DBB802A0CC2960B2DB"
    },
    "cell_type" : "markdown",
    "source" : "(Parquet files preserve the schema information, which we can retrieve from the DataFrame)"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "presentation" : {
        "tabs_state" : "{\n  \"tab_id\": \"#tab188158199-0\"\n}",
        "pivot_chart_state" : "{\n  \"hiddenAttributes\": [],\n  \"menuLimit\": 200,\n  \"cols\": [],\n  \"rows\": [],\n  \"vals\": [],\n  \"exclusions\": {},\n  \"inclusions\": {},\n  \"unusedAttrsVertical\": 85,\n  \"autoSortUnusedAttrs\": false,\n  \"inclusionsInfo\": {},\n  \"aggregatorName\": \"Count\",\n  \"rendererName\": \"Table\"\n}"
      },
      "id" : "774E09B823774494979FC8FE6AE3078F"
    },
    "cell_type" : "code",
    "source" : [ "sensorRef.schema" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "BFF9E80BAB5C4D468154BCD6FB5C6501"
    },
    "cell_type" : "markdown",
    "source" : "## We create our Streaming Context"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "83B2B58C552240BBA6FF518B1AD274EB"
    },
    "cell_type" : "code",
    "source" : [ "import org.apache.spark.streaming.StreamingContext\n", "import org.apache.spark.streaming.Seconds\n", "\n", "val streamingContext = new StreamingContext(sparkContext, Seconds(2))" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "4E6DBF82BB6F4B6584233CD460F67263"
    },
    "cell_type" : "markdown",
    "source" : "## Our stream source will be a a Direct Kafka Stream\n"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "DF03F66BDDE0447B8202D39F2C0202E2"
    },
    "cell_type" : "code",
    "source" : [ "import org.apache.kafka.clients.consumer.ConsumerRecord\n", "import kafka.serializer.StringDecoder\n", "import org.apache.spark.streaming.kafka._\n", "\n", "val kafkaParams = Map[String, String](\n", "  \"metadata.broker.list\" -> kafkaBootstrapServer,\n", "  \"group.id\" -> \"iot-data-group\",\n", "  \"auto.offset.reset\" -> \"largest\",\n", "  \"enable.auto.commit\" -> (false: java.lang.Boolean).toString\n", ")\n", "\n", "val topics = Set(topic)\n", "@transient val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](\n", "     streamingContext, kafkaParams, topics)\n", "\n", "// @transient val stream = KafkaUtils.createDirectStream[String, String](\n", "//   streamingContext,\n", "//   PreferConsistent,\n", "//   Subscribe[String, String](topics, kafkaParams)\n", "// )\n", "\n" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "CCCB597031E7451FB59D18BA85C0E4A4"
    },
    "cell_type" : "markdown",
    "source" : "# Providing Schema information for our streaming data\nNow that we have a DStream of fresh data processed in a 2-second interval, we can start focusing on the gist of this example.\nFirst, we want to define and apply a schema to the data we are receiving.\nIn Scala, we can define a schema with a `case class`"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "E7A917C393654969812E6E38223BBA52"
    },
    "cell_type" : "code",
    "source" : [ "case class SensorData(sensorId: Int, timestamp: Long, value: Double)" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "9AD1ACAD450E44DA8C046EB48CD4EE5A"
    },
    "cell_type" : "markdown",
    "source" : "Now we apply that schema to the dstream, using the `flatMap` function.\n\nWe use `flatMap` instead of a `map` because there might be cases when the incoming data is incomplete or corrupted.\nIf we would use `map`, we would have to provide a resulting value for each transformed record. \nThat is something we cannot do for invalid records.\nWith `flatMap` in combination with `Option`, we can represent valid records as `Some(recordValue)` and invalid records as `None`.\nBy the virtue of `flatMap` the internal `Option` container gets flattend and our resulting stream will only contain valid `recordValue`s.\n\nDuring the parsing of the comma separated records, we not only protect ourselves against missing fields, but also parse the numeric values to their expected types. The surrounding `Try` captures any `NumberFormatException` that might arise from invalid records."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "5285C2BBC1854F059AB8E1D0244AE1C7"
    },
    "cell_type" : "code",
    "source" : [ "import scala.util.Try\n", "val schemaStream = stream.flatMap{case (id, record) => \n", "                                  val fields = record.split(\",\")\n", "                                  if (fields.size == 3) {\n", "                                    Try (SensorData(fields(0).toInt, fields(1).toLong, fields(2).toDouble)).toOption\n", "                                  } else { None }\n", "                                 }" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "7A7C144384904E96BE66A649BD193C15"
    },
    "cell_type" : "markdown",
    "source" : "# Enrich the streaming data, without dropping records.\nWith the schema stream in place, we can proceed to transform the underlying RDDs into DataFrames.\n\nAs in the previous notebook, we are going to use the reference data to add the specific sensor information.\nPreviously, we used the default 'join', which is an inner-join that requires the join key to be available on both sides of the join.\nThis causes us to drop all data records for which we don't know the id. Given that new sensors might become available or misconfigured sensors might be sending an incorrect id, we would like to preserve all records in order to reconcile them in a latter stage.\n\nAs before, we do this in the context of the general-purpose action `foreachRDD`. "
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "99696609577F49DB809AF94C319CB449"
    },
    "cell_type" : "code",
    "source" : [ "val stableSparkSession = sparkSession\n", "import stableSparkSession.implicits._\n", "import org.apache.spark.sql.SaveMode.Append\n", "schemaStream.foreachRDD{rdd => \n", "                        val sensorDF = rdd.toDF()\n", "                        val sensorWithInfo = sensorRef.join(broadcast(sensorDF), Seq(\"sensorId\"), \"rightouter\")\n", "                        val unknownSensors = sensorWithInfo.filter($\"sensorType\".isNull) \n", "                        val knownSensors = sensorWithInfo.filter(!$\"sensorType\".isNull) \n", "                        val denormalizedSensorData =\n", "                            knownSensors.withColumn(\"dnvalue\", $\"value\"*($\"maxRange\"-$\"minRange\")+$\"minRange\")\n", "                        val sensorRecords = denormalizedSensorData.drop(\"value\", \"maxRange\", \"minRange\")\n", "                        val ts= System.currentTimeMillis\n", "                        sensorRecords.write.format(\"parquet\").mode(Append).save(s\"$workDir/$targetFile\")\n", "                        unknownSensors.write.format(\"parquet\").mode(Append).save(s\"$workDir/$unknownSensorsTargetFile\")\n", "                       }" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "DEAD82F07F53459D8D7213998F106D0F"
    },
    "cell_type" : "markdown",
    "source" : "# Custom Streaming Listener\nThis sample custom listener shows how to implement a Streaming Custom Listener to receive updates about our streaming application evetns and progress.\nWe have opted for a UI data display: This custom listener produces a Notebook Widget that reactively receives and displays the notified data from the `StreamingListener` interface.\nThat way we can visually explore the execution lifecycle of this Spark Streaming Job"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "4D17FFA32D9049A58CB55FD17BDD4755"
    },
    "cell_type" : "code",
    "source" : [ "import org.apache.spark.streaming.scheduler._\n", "import scala.collection.immutable.Queue\n", "class NotebookTableStreamingListener() extends StreamingListener {\n", "  case class TableEntry(timestamp: Long, operation: String, target: String, duration: Option[Long])\n", "  object TableEntry {\n", "    def now() = System.currentTimeMillis()\n", "    def apply(operation: String, target: String, duration: Option[Long] = None): TableEntry = {\n", "      this(now(), operation, target, duration)\n", "    }\n", "  }\n", "  val dummyEntry = Seq(TableEntry(\"-\",\"-\"))\n", "  val table = new notebook.front.widgets.charts.TableChart[Seq[TableEntry]](dummyEntry)\n", "  var entries: List[TableEntry] = List() \n", "  val EventLimit = 40\n", "  def add(tableEntry: TableEntry) = {\n", "    entries = (tableEntry :: entries).take(EventLimit)\n", "    table.applyOn(entries)\n", "  }\n", "  \n", "  def batchName(batchInfo: BatchInfo):String = {\n", "    \"batch-\" + batchInfo.batchTime\n", "  }\n", "  def shortOutputOperationDescription(outOp: OutputOperationInfo) : String = {\n", "    outOp.description.split(\"\\n\").headOption.getOrElse(\"-\")\n", "  }\n", "    \n", "  /** Called when the streaming has been started */\n", "  override def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted): Unit = {\n", "    add(TableEntry(\"stream started\", \"-\"))\n", "  }\n", "\n", "  /** Called when a receiver has been started */\n", "  override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit =  {\n", "    add(TableEntry(\"receiver started\", receiverStarted.receiverInfo.name))\n", "  }\n", "\n", "  /** Called when a receiver has reported an error */\n", "  override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {\n", "    add(TableEntry(\"receiver error\", receiverError.receiverInfo.lastError))\n", "  }\n", "\n", "  /** Called when a receiver has been stopped */\n", "  override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) =  {\n", "    add(TableEntry(\"receiver stopped\", receiverStopped.receiverInfo.name))\n", "  }\n", "\n", "  /** Called when a batch of jobs has been submitted for processing. */\n", "  override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) = {\n", "    add(TableEntry(\"batch submitted\", batchName(batchSubmitted.batchInfo)))\n", "  }\n", "\n", "  /** Called when processing of a batch of jobs has started.  */\n", "  override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {\n", "    add(TableEntry(\"batch started\", batchName(batchStarted.batchInfo)))\n", "  }\n", "\n", "  /** Called when processing of a batch of jobs has completed. */\n", "  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {\n", "    add(TableEntry(\"batch completed\", batchName(batchCompleted.batchInfo), batchCompleted.batchInfo.totalDelay))\n", "  }\n", "\n", "  /** Called when processing of a job of a batch has started. */\n", "  override def onOutputOperationStarted(outputOperationStarted: StreamingListenerOutputOperationStarted): Unit = {\n", "    add(TableEntry(\"output operation started\", shortOutputOperationDescription(outputOperationStarted.outputOperationInfo)))\n", "  }\n", "\n", "  /** Called when processing of a job of a batch has completed. */\n", "  override def onOutputOperationCompleted(outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = {\n", "    add(TableEntry(\"output operation completed\", shortOutputOperationDescription(outputOperationCompleted.outputOperationInfo), outputOperationCompleted.outputOperationInfo.duration))\n", "  }\n", "\n", "}\n" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "85D8972DB3584F748E467AAEF68ED742"
    },
    "cell_type" : "markdown",
    "source" : "## Create an instance of the listener that we just defined"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "841F9D154A9841D7AC4846CE84100E41"
    },
    "cell_type" : "code",
    "source" : [ "val customTableListener = new NotebookTableStreamingListener()" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "256641FD99D245678304F0BA2EEFCC18"
    },
    "cell_type" : "markdown",
    "source" : "## Add the listener to the streaming context so that it can receive callbacks from the different lifecycle points"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "E6285DF57D0F44F4804924505C29A0E7"
    },
    "cell_type" : "code",
    "source" : [ "streamingContext.addStreamingListener(customTableListener)" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "8F6454492ABD4E81A5BB81E2FDB8BB7B"
    },
    "cell_type" : "markdown",
    "source" : "## We add the table widget to our notebook to render it"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "CE2B2D0436CB45B08D3DAF05C1A6AFCD"
    },
    "cell_type" : "code",
    "source" : [ "customTableListener.table" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "6511488CAD4041658603F0CF485BC548"
    },
    "cell_type" : "markdown",
    "source" : "## Start the streaming context so that the streaming process can start.\nWatch the table for updates with data about the execution of our streaming context."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "F366201F2275412F818532AB671A55BC"
    },
    "cell_type" : "code",
    "source" : [ "streamingContext.start()" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "B6F0075E9BB04467858CABAA000489EF"
    },
    "cell_type" : "code",
    "source" : [ "// Be careful not to stop the context if you want the streaming process to continue\n", "streamingContext.stop(false)" ],
    "outputs" : [ ]
  } ],
  "nbformat" : 4
}